package org.codehaus.activemq.transport.reliable;

import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
import java.net.URI;
import java.util.LinkedList;
import java.util.List;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.codehaus.activemq.message.Packet;
import org.codehaus.activemq.message.PacketListener;
import org.codehaus.activemq.message.Receipt;
import org.codehaus.activemq.message.WireFormat;
import org.codehaus.activemq.transport.TransportChannel;
import org.codehaus.activemq.transport.composite.CompositeTransportChannel;

public class ReliableTransportChannel extends CompositeTransportChannel
  implements PacketListener, ExceptionListener
{
  private static final Log log = LogFactory.getLog(ReliableTransportChannel.class);
  private Object lock = new Object();
  private LinkedList packetList = new LinkedList();
  private boolean cacheMessagesForFailover;

  public ReliableTransportChannel(WireFormat wireFormat, URI[] uris)
    throws JMSException
  {
    super(wireFormat, uris);
    establishConnection();
  }

  public String toString()
  {
    return "ReliableTransportChannel: " + this.channel;
  }

  public void start()
    throws JMSException
  {
    if ((this.started.commit(false, true)) && 
      (this.channel != null))
      this.channel.start();
  }

  public Receipt send(Packet packet, int timeout)
    throws JMSException
  {
    TransportChannel tc = this.channel;
    while (true)
      try {
        return tc.send(packet, timeout);
      }
      catch (JMSException jmsEx) {
        doReconnect(tc);
      }
  }

  public void asyncSend(Packet packet) throws JMSException
  {
    TransportChannel tc = this.channel;
    while (true)
      try {
        tc.asyncSend(packet);
      }
      catch (JMSException jmsEx)
      {
        doReconnect(tc);
      }
  }

  protected void configureChannel()
  {
    this.channel.setPacketListener(this);
    this.channel.setExceptionListener(this);
  }

  protected URI extractURI(List list) throws JMSException {
    int idx = 0;
    if (list.size() > 1) {
      SMLCGRandom rand = new SMLCGRandom();
      do {
        idx = (int)(rand.nextDouble() * list.size());
      }
      while ((idx < 0) || (idx >= list.size()));
    }
    return (URI)list.remove(idx);
  }

  public void consume(Packet packet)
  {
    PacketListener listener = getPacketListener();
    if (listener != null)
      listener.consume(packet);
  }

  public void onException(JMSException jmsEx)
  {
    TransportChannel tc = this.channel;
    try {
      doReconnect(tc);
    }
    catch (JMSException ex) {
      ex.setLinkedException(jmsEx);
      fireException(ex);
    }
  }

  public void stop() {
    super.stop();
    fireStatusEvent(this.currentURI, 2);
  }

  protected void fireException(JMSException jmsEx)
  {
    ExceptionListener listener = getExceptionListener();
    if (listener != null)
      listener.onException(jmsEx);
  }

  protected void doReconnect(TransportChannel currentChannel) throws JMSException
  {
    if (!this.closed.get())
      synchronized (this.lock)
      {
        if (this.channel == currentChannel) {
          fireStatusEvent(this.currentURI, 2);
          try {
            establishConnection();
          }
          catch (JMSException jmsEx) {
            fireStatusEvent(this.currentURI, 4);
            throw jmsEx;
          }
          fireStatusEvent(this.currentURI, 3);
        }
      }
  }
}